feat(asyncio): add Reader API#309
Open
BewareMyPower wants to merge 3 commits into
Open
Conversation
There was a problem hiding this comment.
Pull request overview
Note
Copilot couldn't run its full agentic review because no GitHub Actions runner was available. Make sure your repository has a runner available to run Copilot's review, or add a copilot-setup-steps.yml file specifying one with the runs-on attribute. See the docs for more details.
Adds an asyncio-compatible Reader API to the Python Pulsar client, including C++ binding support and new integration tests.
Changes:
- Introduced
pulsar.asyncio.Readerwith asyncread_next,seek,has_message_available, andclose. - Added pybind11 C++ exports for Reader async methods and Client async reader creation.
- Added asyncio test coverage for basic reader flows (start positions, seek, message availability, auth failure).
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 6 comments.
| File | Description |
|---|---|
| tests/asyncio_test.py | Adds asyncio integration tests validating new Reader behaviors (read, seek, has-message, auth). |
| src/reader.cc | Exposes Reader async operations (read_next_async, seek_async, etc.) via pybind11 with GIL release. |
| src/client.cc | Exposes async reader creation APIs (create_reader_async, create_reader_async_v2) for asyncio wrapper use. |
| pulsar/asyncio.py | Implements the new Reader wrapper and Client.create_reader() coroutine. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| self._reader = reader | ||
| self._schema = schema | ||
|
|
||
| async def read_next(self, timeout_millis: int | None = None) -> pulsar.Message: |
Comment on lines
+492
to
+498
| future = asyncio.get_running_loop().create_future() | ||
| if timeout_millis is None: | ||
| self._reader.read_next_async(functools.partial(_set_future, future)) | ||
| else: | ||
| _check_type(int, timeout_millis, 'timeout_millis') | ||
| self._reader.read_next_async(functools.partial(_set_future, future)) | ||
| msg = await future |
Comment on lines
+58
to
+61
| void Reader_readNextAsync(Reader& reader, ReadNextCallback callback) { | ||
| py::gil_scoped_release release; | ||
| reader.readNextAsync(callback); | ||
| } |
Comment on lines
87
to
+90
| .def("topic", &Reader::getTopic, return_value_policy::copy) | ||
| .def("read_next", &Reader_readNext) | ||
| .def("read_next", &Reader_readNextTimeout) | ||
| .def("read_next_async", &Reader_readNextAsync) |
| msg = await consumer.receive() | ||
| self.assertEqual(msg.data(), b'msg-3') | ||
|
|
||
| async def test_reader_simple(self): |
Comment on lines
+479
to
+480
| with self.assertRaises(asyncio.TimeoutError): | ||
| await asyncio.wait_for(reader.read_next(), 1) |
RobertIndie
reviewed
Jun 23, 2026
| PulsarException | ||
| """ | ||
| future = asyncio.get_running_loop().create_future() | ||
| if timeout_millis is None: |
Member
There was a problem hiding this comment.
Seems the timeout_millis parameter is useless.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
No description provided.